Cloud Storageトリガー で Cloud Functions 第2世代 を動かして GCS から SFTPサーバ にファイル転送してみた
こんにちは!エノカワです。
Google Cloud Functions は、クラウド サービスの構築と接続に使用するサーバーレスのランタイム環境です。
Cloud Functions には、第1世代 と 第2世代 の2種類のプロダクトがありますが、第2世代は Cloud Run と Eventarc をベースにした新しいバージョンで機能が強化されています。
今回はCloud Functions 第2世代 を動かして GCS から SFTPサーバ にファイル転送を試してみたのでご紹介します。
鍵認証のSFTPサーバをたてる
Cloud Functions 関数を作成する前に ファイル転送先のSFTPサーバをたてます。
下記エントリで、Cloud Composer で Cloud Storage にあるファイルをSFTPサーバに転送するワークロードを紹介しました。
今回も同様の手順で鍵認証のSFTPサーバをたてます。
詳細は上記エントリの[SFTPサーバをたてる]、[鍵認証の設定]、[Secret Manager に 秘密鍵を保存]の箇所を参照ください。
VMインスタンス作成
test-sftp-server
という名前で、東京リージョン、最小のマシン構成を選択し、他はデフォルトのままで作成
フォルダ作成
- [SSH]リンクからインスタンスに接続し、
sftp_user
ユーザーの追加とupload
フォルダを作成
鍵認証の設定
- Cloud Shell からssh-keygenコマンドを使用して、SSH鍵ペアを生成
test-sftp-server
インスタンスの編集画面に入り、[SSH 認証鍵]項目に公開鍵の内容を登録
Secret Manager に 秘密鍵を保存
- 秘密鍵を保管するシークレットとバージョンを作成
- Compute Engine のデフォルトのサービスアカウントに Secret Manager のシークレット アクセサー (
roles/secretmanager.secretAccessor
)のロールを付与
GCSバケットを作成
- 転送ファイルをアップロードするGCSバケット
test-gcs-sftp
を作成
Cloud Functions 関数を作成
準備が整ったので、Cloud Functions 関数を作成していきます。
Google Cloud コンソールで Cloud Functions の[関数の作成]ページに移動します。
環境は第2世代
を選択します。
トリガーはCloud Storage トリガー
を選択したいところですが、どのように設定するのでしょうか?
ドキュメントに 第2世代 でサポートされているトリガーについての記載がありました。
Cloud Functions(第 2 世代)のすべてのイベント ドリブン関数は、イベント配信に Eventarc トリガーを使用します。
Cloud Functions(第 2 世代)では、Pub/Sub トリガーと Cloud Storage トリガーは、Eventarc トリガーの一種として実装されます。
Cloud Functions(第 2 世代)でサポートされているトリガー
Cloud Storage トリガーは、Eventarc トリガーとして実装が必要とのことなので、[EVENTARC トリガーを追加]ボタンをクリックします。
Eventarc トリガーを追加
Eventarc トリガーの設定画面が表示されますので、以下の項目を選択します。
- トリガーのタイプ
Google のソース
- イベント プロバイダ
Cloud Storage
- イベント
google.cloud.storage.object.v1.finalized
- バケット
test-gcs-sftp
google.cloud.storage.object.v1.finalized
は、オブジェクトのファイナライズイベントです。
新しいオブジェクトが作成されるか、既存のオブジェクトが上書きされ、そのオブジェクトの新しい世代が作成されると送信されます。
Eventarc トリガーを追加するにあたって必要な権限が不足している場合は以下のようなメッセージが表示されます。
- Cloud Pub/Sub でIDトークンを作成するには、Pub/Sub のサービスアカウントに
roles/iam.serviceAccountTokenCreator
ロールの付与が必要 - Cloud Storage 経由でイベントを受け取るために、Cloud Storage のサービスアカウントに
roles/pubsub.publisher
ロールの付与が必要
上記ロールが足りていないようなので、[付与]ボタンをクリックして必要なロールを付与します。
ロールが付与されたので、[トリガーを保存]ボタンをクリックします。
Eventarc トリガーが追加されました。
ランタイム環境変数を追加
関数内でSFTPサーバの接続情報を参照するため、ランタイム環境変数に追加しておきます。
[次へ]ボタンをクリックして次の画面に遷移します。
第2世代を使用するのに必要なAPIが有効化されていない場合、以下のような画面が表示されます。
[有効にする]ボタンをクリックして、有効化します。
関数をデプロイ
GCS から SFTPサーバ にファイル転送する関数を作成します。
今回は以下の内容でデプロイしました。
ランタイム
Python 3.10
エントリポイント
copy_file_from_gcs_to_sftp
ソースコード
import os from io import StringIO import functions_framework import paramiko from google.cloud import secretmanager, storage def get_secret_version(project_id, secret_id, version_id="latest"): client = secretmanager.SecretManagerServiceClient() name = f"projects/{project_id}/secrets/{secret_id}/versions/{version_id}" response = client.access_secret_version(request={"name": name}) payload = response.payload.data.decode("UTF-8") return payload def get_fileblob(bucket_name, file_name): gcs_client = storage.Client() bucket = gcs_client.get_bucket(bucket_name) return bucket.blob(file_name) def write_sftp_file(host, port, user, dst_path, bucket_name, file_name, private_key): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, port=port, username=user, pkey=private_key, timeout=5.0) sftp_connection = client.open_sftp() file_path = f"{dst_path}{file_name}" with sftp_connection.file(file_path, "w") as target: fileblob = get_fileblob(bucket_name, file_name) fileblob.download_to_file(target) sftp_connection.close() client.close() @functions_framework.cloud_event def copy_file_from_gcs_to_sftp(cloud_event): PROJECT_ID = os.getenv("PROJECT_ID") SFTP_HOST = os.getenv("SFTP_HOST") SFTP_PORT = os.getenv("SFTP_PORT") SFTP_USER = os.getenv("SFTP_USER") SECRET_ID_SFTP = os.getenv("SECRET_ID_SFTP") DEST_PATH = os.getenv("DEST_PATH") data = cloud_event.data key_stream = get_secret_version(PROJECT_ID, SECRET_ID_SFTP) private_key = paramiko.RSAKey.from_private_key(StringIO(key_stream)) write_sftp_file(SFTP_HOST, int(SFTP_PORT), SFTP_USER, DEST_PATH, data["bucket"], data["name"], private_key)
functions-framework==3.* google-cloud-secret-manager>=2.12.0 google-cloud-storage>=2.4.0 paramiko>=2.8.0
[デプロイ]ボタンをクリックしてしばらくすると、デプロイされました。
Pub/Sub の[トピック]ページに移動すると、デプロイした関数のトピックが作成されていました。
転送ファイルをアップロード
では、Cloud Functions を動かしてみましょう!
SFTPサーバに転送するファイルを Cloud Storage にアップロードします。
今回はtest-gcs-sftp
バケットにtrigger_file.txt
をアップロードしました。
転送ファイルを確認
SFTPサーバにファイルが転送されているか確認してみましょう。
test-sftp-server
インスタンスに接続して、upload
フォルダ配下をチェックします。
trigger_file.txt
ファイルが居てました。
ファイル転送成功です!
まとめ
以上、Cloud Functions 第2世代 を動かして GCS から SFTPサーバ にファイル転送してみました。
Eventarc トリガーの追加や関数名に-
が使用できないなど第1世代と異なる点はありましたが、デプロイに手間取ることはありませんでした。
今回はコンソール上で行ったのですが、ロールの追加やAPIの有効化など必要な操作をその場で確認しながら進めることができたのもその理由だと感じました。
第2世代では、Eventarc でサポートされているすべてのイベントタイプのトリガーをサポートしているそうなので、他も試してみたいと思います!
参考
- Cloud Functions バージョンの比較 | Google Cloud Functions に関するドキュメント
- イベント ドリブン関数を作成する | Google Cloud Functions に関するドキュメント
- Cloud Functions トリガー | Google Cloud Functions に関するドキュメント
- Cloud Storage トリガー | Google Cloud Functions に関するドキュメント
- CloudEvents 形式 - HTTP プロトコル バインディング | Eventarc | Google Cloud
- SFTP — Paramiko documentation
- Allow transfring files from GCS to SFTP · Issue #7844 · googleapis/google-cloud-python
- How to Setup a sFTP Server in Google Cloud Platform and Restrict Access | by Rubens Zimbres | Medium
- シークレットの作成とアクセス | Secret Manager のドキュメント | Google Cloud
- IAM を使用したアクセス制御 | Secret Manager のドキュメント | Google Cloud